package fun.ticsmyc.rpc.client.transport.bio;

import fun.ticsmyc.rpc.Config;
import fun.ticsmyc.rpc.client.transport.RpcClient;
import fun.ticsmyc.rpc.client.util.RpcMessageChecker;
import fun.ticsmyc.rpc.common.entity.RpcRequest;
import fun.ticsmyc.rpc.common.entity.RpcResponse;
import fun.ticsmyc.rpc.common.entity.TRPCServiceProperties;
import fun.ticsmyc.rpc.common.enumeration.RpcError;
import fun.ticsmyc.rpc.common.exception.RpcException;
import fun.ticsmyc.rpc.common.serializer.Serializer;
import fun.ticsmyc.rpc.common.serializer.Serializers;
import fun.ticsmyc.rpc.nacos.registry.ServiceDiscovery;
import fun.ticsmyc.rpc.nacos.registry.impl.NacosServiceDiscoveryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * @author Ticsmyc
 * @date 2020-10-23 10:51
 */
public class BioRpcClient implements RpcClient {
    private final int MAGIC_NUMBER = 0xCAFEBABE;
    private final byte CURRENT_PROTOCOL_VERSION = 1;
    private static final Logger logger = LoggerFactory.getLogger(BioRpcClient.class);

    private static ServiceDiscovery serviceDiscovery;
    private static Serializer serializer;

    private static volatile BioRpcClient INSTANCE;
    private BioRpcClient(){}
    public static BioRpcClient getInstance(){
        if(INSTANCE == null){
            synchronized (BioRpcClient.class){
                if(INSTANCE == null){
                   INSTANCE = new BioRpcClient();
                   init();
                }
            }
        }
        return INSTANCE;
    }

    @Override
    public Object sendRequest(RpcRequest rpcRequest, TRPCServiceProperties trpcServiceProperties){
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(trpcServiceProperties.getSignature());
        String host = inetSocketAddress.getHostName();
        int port =inetSocketAddress.getPort();
        InputStream inputStream = null;
        OutputStream outputStream=null;
        try (Socket socket = new Socket(host, port)) {
            outputStream = socket.getOutputStream();
            //写出去
            sendRpcRequest(outputStream,rpcRequest);

            inputStream = socket.getInputStream();
            //读取响应
            RpcResponse<Object> rpcResponse = getRpcResponse(inputStream);
            RpcMessageChecker.check(rpcRequest,rpcResponse);
            return rpcResponse.getData();
        } catch (IOException e) {
            logger.error("网络IO传输错误");
        } finally{
            try {
                if(inputStream!=null) inputStream.close();
                if(outputStream!=null) outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        //如果调用失败，返回null
        return null;
    }

    private void sendRpcRequest(OutputStream outputStream, RpcRequest rpcRequest) throws IOException {
        byte[] response = serializer.serialize(rpcRequest);
        outputStream.write(intToBytes(MAGIC_NUMBER));
        outputStream.write(new byte[]{CURRENT_PROTOCOL_VERSION});
        outputStream.write(new byte[]{serializer.getId()});
        outputStream.write(intToBytes(response.length));
        outputStream.write(response);
        outputStream.flush();
    }

    private RpcResponse<Object> getRpcResponse(InputStream inputStream) throws IOException {
        //处理输入
        byte[] numberBytes = new byte[4];
        inputStream.read(numberBytes);
        int magic = byteToInt(numberBytes);
        if(magic != MAGIC_NUMBER){
            logger.error("不识别的协议包,{}",magic);
            throw new RpcException(RpcError.UNKNOW_PROTOCOL);
        }

        byte[] protocolVersion = new byte[1];
        inputStream.read(protocolVersion);
        if(protocolVersion[0] > CURRENT_PROTOCOL_VERSION){
            logger.error("不能解析的协议版本:{}",protocolVersion[0]);
            throw new RpcException(RpcError.UNKNOW_PROTOCOL);
        }
        //序列化器
        byte[] serializerCode = new byte[1];
        inputStream.read(serializerCode);
        Serializer serializer = Serializers.getSerializerByCode(serializerCode[0]);
        if(serializer == null){
            logger.error("不能识别的序列化器,{}",serializerCode);
            throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
        }

        //数据包长度和数据包
        inputStream.read(numberBytes);
        int length = byteToInt(numberBytes);
        byte[] bytes = new byte[length];
        inputStream.read(bytes);
        return (RpcResponse)serializer.deserialize(bytes, RpcResponse.class);
    }

    private int byteToInt(byte[] src) {
        int value;
        value = ((src[0] & 0xFF)<<24)
                |((src[1] & 0xFF)<<16)
                |((src[2] & 0xFF)<<8)
                |(src[3] & 0xFF);
        return value;
    }
    private byte[] intToBytes(int value) {
        byte[] src = new byte[4];
        src[0] = (byte) ((value>>24) & 0xFF);
        src[1] = (byte) ((value>>16)& 0xFF);
        src[2] = (byte) ((value>>8)&0xFF);
        src[3] = (byte) (value & 0xFF);
        return src;
    }

    //初始化整个系统
    public static void init(){
        serviceDiscovery = new NacosServiceDiscoveryImpl(Config.getLoadBalancer());
        serializer=Config.getSerializer();
    }
}
